import urllib3 from ftlangdetect import detect urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) import polars as pl from tqdm import tqdm import requests from multiprocessing.pool import ThreadPool import json from datetime import datetime, timezone from lxml import etree from bs4 import BeautifulSoup, Comment import re def is_https_responding(host): retries = 0 while retries <= 1: try: requests.get(f"https://{host}", timeout=15, allow_redirects=False) return True except KeyboardInterrupt: break except: retries += 1 return False def get_location_header(task): host, responding = task if not responding: return None retries = 0 while retries <= 1: try: r = requests.get(f"https://{host}", timeout=15, allow_redirects=False) return r.headers.get("Location",) except KeyboardInterrupt: break except: retries += 1 return None def map_decorator(func): def _inner(key): return key, func(key) return _inner def parallel_map(series, func): tasks = set(series) with ThreadPool(processes=16) as pool: mapping = {} mapping.update(tqdm(pool.imap_unordered(map_decorator(func), tasks), total=len(tasks))) return list(map(mapping.get, series)) def phase1(): df = pl.read_csv("input/ai.txt") df = df.with_columns( pl.Series("is_https_responding", parallel_map(df["domain"], is_https_responding)) ) print(df) df.write_csv("intermediate/phase1.csv") def phase2(): df = pl.read_csv("intermediate/phase1.csv") df = df.with_columns( pl.Series("http_location", parallel_map(list(df.select("domain", "is_https_responding").iter_rows()), get_location_header)) ) print(df) df.write_csv("intermediate/phase2.csv") USERNAME, PASSWORD = '', '' def download_webpage(url): if url is None: return None if not url.startswith(("http://", "https://")): url = f"https://{url}" retries = 0 while retries <= 1: try: response = requests.get( url, verify=False, proxies={ "http": f"http://{USERNAME}:{PASSWORD}@unblock.oxylabs.io:60000", "https": f"https://{USERNAME}:{PASSWORD}@unblock.oxylabs.io:60000", }, headers={ "X-Oxylabs-Render": "html" }, ) response_headers = dict(response.headers) response_headers.pop("Set-Cookie", None) response_headers.pop("X-Oxylabs-Client-Id", None) response_headers.pop("X-Oxylabs-Job-Id", None) return json.dumps({ "headers": response_headers, "body": response.content.decode("utf8"), "utc_scrape_timestamp": datetime.now(tz=timezone.utc).isoformat() }, ensure_ascii=False) except KeyboardInterrupt: break except Exception as ex: print(ex) retries += 1 return None def phase3(): df = pl.read_csv("intermediate/phase2.csv") df = df.with_columns( url=( pl.when(pl.col("is_https_responding") == False).then(None) .when(pl.col("http_location").is_null()).then("https://" + pl.col("domain")) .when(pl.col("http_location").str.contains(r"\.ai($|/|#)")).then(pl.col("http_location")) .otherwise(None) ) ) print(f"Visiting {df['url'].count()} URLs ({df['url'].n_unique()} unique)") df = df.with_columns( pl.Series("response", parallel_map(df["url"], download_webpage)) ) df.write_csv("intermediate/phase3.csv") def phase4(): df = pl.read_csv("intermediate/phase3.csv", infer_schema=False) df = df.with_columns( response_headers=pl.col("response").map_elements(lambda x: json.dumps(json.loads(x)["headers"]), return_dtype=pl.String), response_body=pl.col("response").map_elements(lambda x: json.loads(x)["body"], return_dtype=pl.String), utc_scrape_timestamp=pl.col("response").map_elements(lambda x: json.loads(x)["body"], return_dtype=pl.String), ).drop( "response" ).with_columns( response_body_length=pl.col("response_body").str.len_bytes(), response_content_type=pl.col("response_headers").map_elements(lambda x: json.loads(x).get("Content-Type"), return_dtype=pl.String) ).with_columns( is_valid_response=( (pl.col("response_body_length").is_not_null()) & (pl.col("response_body_length") > 0) & (pl.col("response_content_type").is_not_null()) & (pl.col("response_content_type").str.starts_with("text/html")) ) ) df.write_csv("intermediate/phase4.csv") ELEMENTS_TO_STRIP = [ "style", "script", "svg", "canvas", "img" ] def _get_html_text(html): if html is None or html == "": return None # Use html5lib; better at handling invalid HTML soup = BeautifulSoup(html, "html5lib") for tag in soup.find_all(ELEMENTS_TO_STRIP): tag.decompose() for comment in soup.find_all(text=lambda x: isinstance(x, Comment)): comment.decompose() html = soup.prettify() # Use lxml; better at extracting text parser = etree.XMLParser(recover=True) root = etree.fromstring(html, parser) if root is None: return None text = " ".join(etree.XPath("//text()")(root)).strip() text = re.sub(r"\s+", " ", text) return text def phase5(): df = pl.read_csv("intermediate/phase4.csv", infer_schema=False) df = df.with_columns( pl.Series("response_body_parsed", parallel_map(df["response_body"], _get_html_text)) ) df.write_csv("intermediate/phase5.csv") PARKED_STRINGS = [ ".ai for sale", ".ai is for sale", ".ai está à venda", ".ai está a la venta", "this website is for sale", "this domain is for sale", "this domain name is for sale", "domains for sale:", "premium domain for sale", ".ai may be for sale", ".ai page under construction" "this domain is registered, but may still be available", "this site is not published or does not have a domain assigned to it", "there are a few potential reasons: you haven't deployed an app yet", "the specified bucket does not exist", "looks like this domain isn't connected to a website yet", "there isn't a github pages site here", "this deployment cannot be found", "this domain has been mapped to squarespace, but it has not yet been", ".ai is parked free", "porkbun.com", "spaceship.com", "parked domain name", "0.ai to 9.ai", "404 not found the resource requested could not be found", "404 not found 404 not found", "404 page not found", ] def phase6(): df = pl.read_csv("intermediate/phase5.csv", infer_schema=False) df = df.with_columns( is_parked=pl.col("response_body_parsed") .str.to_lowercase() .str.contains_any(PARKED_STRINGS), is_short=pl.col("response_body_parsed") .str.to_lowercase() .str.split(" ") .list.unique() .list.len() .lt(15) ) df.write_csv("intermediate/phase6.csv") def phase7(): df = pl.read_csv("intermediate/phase6.csv", infer_schema=False) detect(text="This will download the model...") def _detect_lang(text): if not text: return None lang = detect(text=text, low_memory=False) return lang["lang"] df = df.with_columns( pl.Series("response_body_lang", parallel_map(df["response_body_parsed"], _detect_lang)) ) df = df.with_columns( considered_for_analysis=( (pl.col("is_https_responding") == "true") & pl.col("url").is_unique() & (pl.col("is_valid_response") == "true") & (pl.col("is_parked") == "false") & (pl.col("is_short") == "false") & (pl.col("response_body_lang") == "en") ) ) df.write_csv("output/output.csv") def get_sankey_info(): nodes = [] links = [] df = pl.read_csv("intermediate/phase7.csv", infer_schema=False) nodes.append({ "id": "Initial list", "color": "#7f7f7f" }) nodes.append({ "id": "Connection error", "color": "#e15759" }) links.append({ "source": "Initial list", "target": "Connection error", "value": len(df.filter(pl.col("is_https_responding") == "false")) }) df = df.filter(pl.col("is_https_responding") == "true") nodes.append({ "id": "HTTPS Connection", "color": "#739d6e" }) links.append({ "source": "Initial list", "target": "HTTPS Connection", "value": len(df) }) nodes.append({ "id": "Duplicate URL after redirect", "color": "#e15759" }) nodes.append({ "id": "Non-HTML response", "color": "#e15759" }) links.append({ "source": "HTTPS Connection", "target": "Duplicate URL after redirect", "value": len(df.filter(~pl.col("url").is_unique())) }) links.append({ "source": "HTTPS Connection", "target": "Non-HTML response", "value": len(df.filter(pl.col("is_valid_response") == "false")) }) df = df.filter(pl.col("is_valid_response") == "true") nodes.append({ "id": "Valid HTML response", "color": "#739d6e" }) nodes.append({ "id": "Parked domain", "color": "#e15759" }) nodes.append({ "id": "Content too short", "color": "#e15759" }) links.append({ "source": "Valid HTML response", "target": "Parked domain", "value": len(df.filter(pl.col("is_parked") == "true")) }) links.append({ "source": "Valid HTML response", "target": "Content too short", "value": len(df.filter(pl.col("is_short") == "true")) }) df = df.filter((pl.col("is_short") == "false") & (pl.col("is_parked") == "false")) nodes.append({ "id": "Valid content", "color": "#739d6e" }) nodes.append({ "id": "Non-English content", "color": "#e15759" }) nodes.append({ "id": "Considered for analysis", "color": "#59a14f" }) links.append({ "source": "Valid content", "target": "Non-English content", "value": len(df.filter(pl.col("response_body_lang") != "en")) }) links.append({ "source": "Valid content", "target": "Considered for analysis", "value": len(df.filter(pl.col("considered_for_analysis") == "true")) }) return nodes, links if __name__ == "__main__": phase1() phase2() phase3() phase4() phase5() phase6() phase7() nodes, links = get_sankey_info() print(nodes) print(links) print("Done!")