Distributed Wikipedia Crawler System Design
Problem
Design a system to crawl and copy all of Wikipedia using a distributed network of machines.
More specifically, suppose your server has access to a set of client machines. Your client machines can execute code you have written to access Wikipedia pages, download and parse their data, and write the results to a database.
Some questions you may want to consider as part of your solution are:
- How will you reach as many pages as possible?
- How can you keep track of pages that have already been visited?
- How will you deal with your client machines being blacklisted?
- How can you update your database when Wikipedia pages are added or updated?
Examples
Example 1
Input:
- Starting URLs: ["https://en.wikipedia.org/wiki/Main_Page"]
- Client machines: 100
- Target: Complete English Wikipedia crawl
Output:
- Crawled pages: ~6.7 million articles
- Database entries: Article content, metadata, links
- Time: ~24-48 hours with proper rate limiting
Explanation:
Starting from Main Page, discover links to all other pages through BFS traversal.
Use distributed queue system to coordinate work across 100 machines.
Implement rate limiting to avoid being blocked by Wikipedia.
Example 2
Input:
- Starting URLs: Multiple language Wikipedia main pages
- Client machines: 500
- Target: Multi-language Wikipedia crawl
Output:
- Crawled pages: ~60+ million articles across languages
- Database schema: Language-specific tables with cross-references
- Incremental updates: Daily delta processing
Explanation:
Separate crawling queues per language to handle different update frequencies.
Use language-specific crawlers with appropriate character encoding.
Implement change detection through Wikipedia's API and revision tracking.
Example 3
Input:
- Scenario: 50% of client machines get IP-banned
- Remaining machines: 50
- Requirement: Continue crawling without interruption
Output:
- Failover time: <5 minutes
- Crawl completion: Delayed but successful
- IP rotation: Automatic proxy switching
Explanation:
Implement IP rotation with proxy pools and VPN services.
Use exponential backoff for banned IPs with automatic retry.
Redistribute work from failed machines to healthy ones.
Example 4
Input:
- Existing database: 6.5M articles as of yesterday
- New Wikipedia state: 6.502M articles (2K new, 500 updated)
- Update frequency: Every 6 hours
Output:
- Incremental crawl: Only 2.5K pages processed
- Update time: 15 minutes vs 48 hours for full crawl
- Consistency: 99.9% synchronization with Wikipedia
Explanation:
Use Wikipedia's Recent Changes API to identify modified pages.
Compare timestamps and revision IDs to detect updates.
Priority queue for new/updated pages over full re-crawl.
Example 5
Input:
- Memory constraint: 8GB RAM per machine
- Page size: Average 50KB per article
- Concurrent connections: 100 per machine
Output:
- Memory usage: ~5MB for page buffers + 2GB for URL queue
- Throughput: 10 pages/second per machine
- Queue size: 1M URLs buffered locally per machine
Explanation:
Implement memory-efficient streaming processing.
Use disk-based queues for large URL lists.
Batch database writes to optimize I/O operations.
Solution
Method 1 - Master-Worker Architecture with Distributed Queue
Intuition
A distributed web crawler requires coordination between multiple machines to avoid duplicate work while maximizing coverage. The key insight is to use a master-worker pattern where a central coordinator assigns work to client machines through distributed queues, while maintaining a global visited set to prevent redundant crawling.
Approach
- Master Server Setup: Initialize central coordinator with seed URLs and client machine registry
- Distributed Queue System: Use message queue (Redis/RabbitMQ) to distribute URLs to worker machines
- Visited Set Management: Maintain distributed hash set (Redis) to track crawled pages across all workers
- Worker Process: Each client machine polls for URLs, crawls pages, extracts links, and stores results
- Rate Limiting: Implement distributed rate limiting to respect Wikipedia's robots.txt and avoid IP bans
- Fault Tolerance: Handle machine failures through heartbeats and work redistribution
- Incremental Updates: Use Wikipedia's API to detect changes and schedule re-crawls
Code
C++
class DistributedWikipediaCrawler {
private:
struct CrawlTask {
string url;
int priority;
time_t scheduledTime;
int retryCount;
};
struct MachineStatus {
string machineId;
string ipAddress;
time_t lastHeartbeat;
bool isBlacklisted;
int crawlRate;
};
class MasterServer {
private:
unordered_set<string> visitedUrls;
priority_queue<CrawlTask> taskQueue;
unordered_map<string, MachineStatus> machines;
unordered_map<string, time_t> rateLimits;
public:
void initializeCrawl(vector<string>& seedUrls) {
for (const auto& url : seedUrls) {
taskQueue.push({url, 1, time(nullptr), 0});
}
}
CrawlTask assignTask(const string& machineId) {
updateMachineHeartbeat(machineId);
if (taskQueue.empty()) {
return {"", 0, 0, 0};
}
auto task = taskQueue.top();
taskQueue.pop();
// Check rate limiting
string domain = extractDomain(task.url);
if (rateLimits[domain] > time(nullptr)) {
// Re-queue with delay
task.scheduledTime = rateLimits[domain] + 1;
taskQueue.push(task);
return assignTask(machineId);
}
visitedUrls.insert(task.url);
rateLimits[domain] = time(nullptr) + 1; // 1 second delay
return task;
}
void submitResults(const string& machineId, const CrawlResult& result) {
// Store in database
storePageContent(result);
// Extract and queue new URLs
for (const auto& link : result.extractedLinks) {
if (visitedUrls.find(link) == visitedUrls.end()) {
taskQueue.push({link, 2, time(nullptr), 0});
}
}
}
void handleMachineFailure(const string& machineId) {
machines[machineId].isBlacklisted = true;
// Redistribute pending tasks
redistributeTasks(machineId);
}
private:
string extractDomain(const string& url) {
size_t start = url.find("://") + 3;
size_t end = url.find("/", start);
return url.substr(start, end - start);
}
void redistributeTasks(const string& failedMachine) {
// Implementation for task redistribution
}
void updateMachineHeartbeat(const string& machineId) {
machines[machineId].lastHeartbeat = time(nullptr);
}
void storePageContent(const CrawlResult& result) {
// Database storage implementation
}
};
class WorkerMachine {
private:
string machineId;
MasterServer* master;
vector<string> proxyList;
int currentProxyIndex;
public:
void startCrawling() {
while (true) {
auto task = master->assignTask(machineId);
if (task.url.empty()) {
this_thread::sleep_for(chrono::seconds(5));
continue;
}
auto result = crawlPage(task.url);
if (result.success) {
master->submitResults(machineId, result);
} else if (result.isBlacklisted) {
rotateProxy();
master->handleMachineFailure(machineId);
}
// Rate limiting
this_thread::sleep_for(chrono::milliseconds(100));
}
}
private:
CrawlResult crawlPage(const string& url) {
// HTTP request implementation with proxy support
CrawlResult result;
try {
auto response = makeHttpRequest(url);
result.content = response.body;
result.extractedLinks = parseLinks(response.body);
result.success = true;
} catch (const HttpException& e) {
if (e.statusCode == 429 || e.statusCode == 403) {
result.isBlacklisted = true;
}
result.success = false;
}
return result;
}
void rotateProxy() {
currentProxyIndex = (currentProxyIndex + 1) % proxyList.size();
}
vector<string> parseLinks(const string& html) {
vector<string> links;
// HTML parsing implementation
return links;
}
};
public:
void runDistributedCrawl(vector<string>& seedUrls, int numMachines) {
MasterServer master;
master.initializeCrawl(seedUrls);
vector<thread> workers;
for (int i = 0; i < numMachines; i++) {
workers.emplace_back([&master, i]() {
WorkerMachine worker;
worker.startCrawling();
});
}
for (auto& worker : workers) {
worker.join();
}
}
};
struct CrawlResult {
string url;
string content;
vector<string> extractedLinks;
time_t timestamp;
bool success;
bool isBlacklisted;
};
Go
type CrawlTask struct {
URL string
Priority int
ScheduledTime time.Time
RetryCount int
}
type MachineStatus struct {
MachineID string
IPAddress string
LastHeartbeat time.Time
IsBlacklisted bool
CrawlRate int
}
type CrawlResult struct {
URL string
Content string
ExtractedLinks []string
Timestamp time.Time
Success bool
IsBlacklisted bool
}
type MasterServer struct {
visitedUrls map[string]bool
taskQueue chan CrawlTask
machines map[string]*MachineStatus
rateLimits map[string]time.Time
mutex sync.RWMutex
}
func NewMasterServer() *MasterServer {
return &MasterServer{
visitedUrls: make(map[string]bool),
taskQueue: make(chan CrawlTask, 1000000),
machines: make(map[string]*MachineStatus),
rateLimits: make(map[string]time.Time),
}
}
func (ms *MasterServer) InitializeCrawl(seedUrls []string) {
for _, url := range seedUrls {
ms.taskQueue <- CrawlTask{
URL: url,
Priority: 1,
ScheduledTime: time.Now(),
RetryCount: 0,
}
}
}
func (ms *MasterServer) AssignTask(machineID string) (CrawlTask, error) {
ms.updateMachineHeartbeat(machineID)
select {
case task := <-ms.taskQueue:
domain := ms.extractDomain(task.URL)
ms.mutex.Lock()
if rateLimit, exists := ms.rateLimits[domain]; exists && time.Now().Before(rateLimit) {
// Re-queue with delay
task.ScheduledTime = rateLimit.Add(time.Second)
ms.taskQueue <- task
ms.mutex.Unlock()
return ms.AssignTask(machineID)
}
ms.visitedUrls[task.URL] = true
ms.rateLimits[domain] = time.Now().Add(time.Second)
ms.mutex.Unlock()
return task, nil
case <-time.After(5 * time.Second):
return CrawlTask{}, fmt.Errorf("no tasks available")
}
}
func (ms *MasterServer) SubmitResults(machineID string, result CrawlResult) {
// Store in database
ms.storePageContent(result)
// Extract and queue new URLs
for _, link := range result.ExtractedLinks {
ms.mutex.RLock()
if !ms.visitedUrls[link] {
ms.taskQueue <- CrawlTask{
URL: link,
Priority: 2,
ScheduledTime: time.Now(),
RetryCount: 0,
}
}
ms.mutex.RUnlock()
}
}
func (ms *MasterServer) HandleMachineFailure(machineID string) {
ms.mutex.Lock()
if machine, exists := ms.machines[machineID]; exists {
machine.IsBlacklisted = true
}
ms.mutex.Unlock()
// Redistribute pending tasks
ms.redistributeTasks(machineID)
}
func (ms *MasterServer) extractDomain(url string) string {
if idx := strings.Index(url, "://"); idx != -1 {
url = url[idx+3:]
}
if idx := strings.Index(url, "/"); idx != -1 {
url = url[:idx]
}
return url
}
func (ms *MasterServer) updateMachineHeartbeat(machineID string) {
ms.mutex.Lock()
if machine, exists := ms.machines[machineID]; exists {
machine.LastHeartbeat = time.Now()
} else {
ms.machines[machineID] = &MachineStatus{
MachineID: machineID,
LastHeartbeat: time.Now(),
IsBlacklisted: false,
}
}
ms.mutex.Unlock()
}
func (ms *MasterServer) storePageContent(result CrawlResult) {
// Database storage implementation
}
func (ms *MasterServer) redistributeTasks(failedMachine string) {
// Implementation for task redistribution
}
type WorkerMachine struct {
machineID string
master *MasterServer
proxyList []string
currentProxyIndex int
client *http.Client
}
func NewWorkerMachine(machineID string, master *MasterServer) *WorkerMachine {
return &WorkerMachine{
machineID: machineID,
master: master,
proxyList: []string{"proxy1.com", "proxy2.com", "proxy3.com"},
client: &http.Client{Timeout: 30 * time.Second},
}
}
func (wm *WorkerMachine) StartCrawling() {
for {
task, err := wm.master.AssignTask(wm.machineID)
if err != nil {
time.Sleep(5 * time.Second)
continue
}
result := wm.crawlPage(task.URL)
if result.Success {
wm.master.SubmitResults(wm.machineID, result)
} else if result.IsBlacklisted {
wm.rotateProxy()
wm.master.HandleMachineFailure(wm.machineID)
}
// Rate limiting
time.Sleep(100 * time.Millisecond)
}
}
func (wm *WorkerMachine) crawlPage(url string) CrawlResult {
result := CrawlResult{
URL: url,
Timestamp: time.Now(),
}
resp, err := wm.client.Get(url)
if err != nil {
result.Success = false
return result
}
defer resp.Body.Close()
if resp.StatusCode == 429 || resp.StatusCode == 403 {
result.IsBlacklisted = true
result.Success = false
return result
}
body, err := io.ReadAll(resp.Body)
if err != nil {
result.Success = false
return result
}
result.Content = string(body)
result.ExtractedLinks = wm.parseLinks(result.Content)
result.Success = true
return result
}
func (wm *WorkerMachine) rotateProxy() {
wm.currentProxyIndex = (wm.currentProxyIndex + 1) % len(wm.proxyList)
// Update HTTP client with new proxy
}
func (wm *WorkerMachine) parseLinks(html string) []string {
var links []string
// HTML parsing implementation using regex or parser
re := regexp.MustCompile(`href="(/wiki/[^"]*)"`)
matches := re.FindAllStringSubmatch(html, -1)
for _, match := range matches {
if len(match) > 1 {
links = append(links, "https://en.wikipedia.org"+match[1])
}
}
return links
}
func RunDistributedCrawl(seedUrls []string, numMachines int) {
master := NewMasterServer()
master.InitializeCrawl(seedUrls)
var wg sync.WaitGroup
for i := 0; i < numMachines; i++ {
wg.Add(1)
go func(machineID string) {
defer wg.Done()
worker := NewWorkerMachine(machineID, master)
worker.StartCrawling()
}(fmt.Sprintf("machine-%d", i))
}
wg.Wait()
}
Java
class DistributedWikipediaCrawler {
static class CrawlTask {
String url;
int priority;
long scheduledTime;
int retryCount;
CrawlTask(String url, int priority, long scheduledTime, int retryCount) {
this.url = url;
this.priority = priority;
this.scheduledTime = scheduledTime;
this.retryCount = retryCount;
}
}
static class MachineStatus {
String machineId;
String ipAddress;
long lastHeartbeat;
boolean isBlacklisted;
int crawlRate;
MachineStatus(String machineId, String ipAddress) {
this.machineId = machineId;
this.ipAddress = ipAddress;
this.lastHeartbeat = System.currentTimeMillis();
this.isBlacklisted = false;
this.crawlRate = 10;
}
}
static class CrawlResult {
String url;
String content;
List<String> extractedLinks;
long timestamp;
boolean success;
boolean isBlacklisted;
CrawlResult(String url) {
this.url = url;
this.extractedLinks = new ArrayList<>();
this.timestamp = System.currentTimeMillis();
this.success = false;
this.isBlacklisted = false;
}
}
static class MasterServer {
private final Set<String> visitedUrls = ConcurrentHashMap.newKeySet();
private final BlockingQueue<CrawlTask> taskQueue = new LinkedBlockingQueue<>();
private final Map<String, MachineStatus> machines = new ConcurrentHashMap<>();
private final Map<String, Long> rateLimits = new ConcurrentHashMap<>();
private final ExecutorService executor = Executors.newCachedThreadPool();
public void initializeCrawl(List<String> seedUrls) {
for (String url : seedUrls) {
taskQueue.offer(new CrawlTask(url, 1, System.currentTimeMillis(), 0));
}
}
public CrawlTask assignTask(String machineId) throws InterruptedException {
updateMachineHeartbeat(machineId);
CrawlTask task = taskQueue.take();
String domain = extractDomain(task.url);
Long rateLimit = rateLimits.get(domain);
if (rateLimit != null && System.currentTimeMillis() < rateLimit) {
task.scheduledTime = rateLimit + 1000;
taskQueue.offer(task);
return assignTask(machineId);
}
visitedUrls.add(task.url);
rateLimits.put(domain, System.currentTimeMillis() + 1000);
return task;
}
public void submitResults(String machineId, CrawlResult result) {
storePageContent(result);
for (String link : result.extractedLinks) {
if (!visitedUrls.contains(link)) {
taskQueue.offer(new CrawlTask(link, 2, System.currentTimeMillis(), 0));
}
}
}
public void handleMachineFailure(String machineId) {
MachineStatus machine = machines.get(machineId);
if (machine != null) {
machine.isBlacklisted = true;
}
redistributeTasks(machineId);
}
private String extractDomain(String url) {
try {
URL u = new URL(url);
return u.getHost();
} catch (Exception e) {
return "unknown";
}
}
private void updateMachineHeartbeat(String machineId) {
machines.computeIfAbsent(machineId, id -> new MachineStatus(id, "unknown"))
.lastHeartbeat = System.currentTimeMillis();
}
private void storePageContent(CrawlResult result) {
// Database storage implementation
}
private void redistributeTasks(String failedMachine) {
// Implementation for task redistribution
}
}
static class WorkerMachine {
private final String machineId;
private final MasterServer master;
private final List<String> proxyList;
private int currentProxyIndex;
private final HttpClient client;
public WorkerMachine(String machineId, MasterServer master) {
this.machineId = machineId;
this.master = master;
this.proxyList = Arrays.asList("proxy1.com", "proxy2.com", "proxy3.com");
this.currentProxyIndex = 0;
this.client = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(30))
.build();
}
public void startCrawling() {
while (!Thread.currentThread().isInterrupted()) {
try {
CrawlTask task = master.assignTask(machineId);
CrawlResult result = crawlPage(task.url);
if (result.success) {
master.submitResults(machineId, result);
} else if (result.isBlacklisted) {
rotateProxy();
master.handleMachineFailure(machineId);
}
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
private CrawlResult crawlPage(String url) {
CrawlResult result = new CrawlResult(url);
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.timeout(Duration.ofSeconds(30))
.build();
HttpResponse<String> response = client.send(request,
HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 429 || response.statusCode() == 403) {
result.isBlacklisted = true;
return result;
}
result.content = response.body();
result.extractedLinks = parseLinks(result.content);
result.success = true;
} catch (Exception e) {
result.success = false;
}
return result;
}
private void rotateProxy() {
currentProxyIndex = (currentProxyIndex + 1) % proxyList.size();
}
private List<String> parseLinks(String html) {
List<String> links = new ArrayList<>();
Pattern pattern = Pattern.compile("href=\"(/wiki/[^\"]*)\"");
Matcher matcher = pattern.matcher(html);
while (matcher.find()) {
links.add("https://en.wikipedia.org" + matcher.group(1));
}
return links;
}
}
public void runDistributedCrawl(List<String> seedUrls, int numMachines) {
MasterServer master = new MasterServer();
master.initializeCrawl(seedUrls);
ExecutorService workers = Executors.newFixedThreadPool(numMachines);
for (int i = 0; i < numMachines; i++) {
final String machineId = "machine-" + i;
workers.submit(() -> {
WorkerMachine worker = new WorkerMachine(machineId, master);
worker.startCrawling();
});
}
}
}
Python
class DistributedWikipediaCrawler:
class CrawlTask:
def __init__(self, url: str, priority: int, scheduled_time: float, retry_count: int):
self.url = url
self.priority = priority
self.scheduled_time = scheduled_time
self.retry_count = retry_count
class MachineStatus:
def __init__(self, machine_id: str, ip_address: str):
self.machine_id = machine_id
self.ip_address = ip_address
self.last_heartbeat = time.time()
self.is_blacklisted = False
self.crawl_rate = 10
class CrawlResult:
def __init__(self, url: str):
self.url = url
self.content = ""
self.extracted_links: List[str] = []
self.timestamp = time.time()
self.success = False
self.is_blacklisted = False
class MasterServer:
def __init__(self):
self.visited_urls: Set[str] = set()
self.task_queue: queue.Queue = queue.Queue()
self.machines: Dict[str, 'DistributedWikipediaCrawler.MachineStatus'] = {}
self.rate_limits: Dict[str, float] = {}
self.lock = threading.RLock()
def initialize_crawl(self, seed_urls: List[str]) -> None:
for url in seed_urls:
task = DistributedWikipediaCrawler.CrawlTask(url, 1, time.time(), 0)
self.task_queue.put(task)
def assign_task(self, machine_id: str) -> Optional['DistributedWikipediaCrawler.CrawlTask']:
self._update_machine_heartbeat(machine_id)
try:
task = self.task_queue.get(timeout=5)
domain = self._extract_domain(task.url)
with self.lock:
if domain in self.rate_limits and time.time() < self.rate_limits[domain]:
task.scheduled_time = self.rate_limits[domain] + 1
self.task_queue.put(task)
return self.assign_task(machine_id)
self.visited_urls.add(task.url)
self.rate_limits[domain] = time.time() + 1
return task
except queue.Empty:
return None
def submit_results(self, machine_id: str, result: 'DistributedWikipediaCrawler.CrawlResult') -> None:
self._store_page_content(result)
for link in result.extracted_links:
with self.lock:
if link not in self.visited_urls:
task = DistributedWikipediaCrawler.CrawlTask(link, 2, time.time(), 0)
self.task_queue.put(task)
def handle_machine_failure(self, machine_id: str) -> None:
with self.lock:
if machine_id in self.machines:
self.machines[machine_id].is_blacklisted = True
self._redistribute_tasks(machine_id)
def _extract_domain(self, url: str) -> str:
try:
from urllib.parse import urlparse
parsed = urlparse(url)
return parsed.netloc
except:
return "unknown"
def _update_machine_heartbeat(self, machine_id: str) -> None:
with self.lock:
if machine_id not in self.machines:
self.machines[machine_id] = DistributedWikipediaCrawler.MachineStatus(machine_id, "unknown")
self.machines[machine_id].last_heartbeat = time.time()
def _store_page_content(self, result: 'DistributedWikipediaCrawler.CrawlResult') -> None:
# Database storage implementation
pass
def _redistribute_tasks(self, failed_machine: str) -> None:
# Implementation for task redistribution
pass
class WorkerMachine:
def __init__(self, machine_id: str, master: 'DistributedWikipediaCrawler.MasterServer'):
self.machine_id = machine_id
self.master = master
self.proxy_list = ["proxy1.com", "proxy2.com", "proxy3.com"]
self.current_proxy_index = 0
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'WikipediaCrawler/1.0 (Educational Purpose)'
})
def start_crawling(self) -> None:
while True:
task = self.master.assign_task(self.machine_id)
if not task:
time.sleep(5)
continue
result = self._crawl_page(task.url)
if result.success:
self.master.submit_results(self.machine_id, result)
elif result.is_blacklisted:
self._rotate_proxy()
self.master.handle_machine_failure(self.machine_id)
time.sleep(0.1) # Rate limiting
def _crawl_page(self, url: str) -> 'DistributedWikipediaCrawler.CrawlResult':
result = DistributedWikipediaCrawler.CrawlResult(url)
try:
response = self.session.get(url, timeout=30)
if response.status_code in [429, 403]:
result.is_blacklisted = True
return result
response.raise_for_status()
result.content = response.text
result.extracted_links = self._parse_links(result.content)
result.success = True
except requests.RequestException:
result.success = False
return result
def _rotate_proxy(self) -> None:
self.current_proxy_index = (self.current_proxy_index + 1) % len(self.proxy_list)
proxy = self.proxy_list[self.current_proxy_index]
self.session.proxies.update({
'http': f'http://{proxy}',
'https': f'https://{proxy}'
})
def _parse_links(self, html: str) -> List[str]:
import re
links = []
pattern = re.compile(r'href="(/wiki/[^"]*)"')
matches = pattern.findall(html)
for match in matches:
links.append(f"https://en.wikipedia.org{match}")
return links
def run_distributed_crawl(self, seed_urls: List[str], num_machines: int) -> None:
master = self.MasterServer()
master.initialize_crawl(seed_urls)
threads = []
for i in range(num_machines):
machine_id = f"machine-{i}"
worker = self.WorkerMachine(machine_id, master)
thread = threading.Thread(target=worker.start_crawling)
thread.daemon = True
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
Complexity
- ⏰ Time complexity:
O(P + E), where P is the number of pages and E is the number of edges (links) between pages. Each page is visited once and each link is processed once - 🧺 Space complexity:
O(P + E)for storing the visited set, task queue, and extracted links. Distributed across multiple machines reduces per-machine memory usage
Method 2 - Event-Driven Architecture with Change Detection
Intuition
Instead of periodically re-crawling all pages, we can use Wikipedia's real-time change feeds and API to detect updates and only crawl modified content. This approach significantly reduces bandwidth and processing requirements while maintaining up-to-date data.
Approach
- Initial Seed Crawl: Perform complete crawl using Method 1 to establish baseline
- Wikipedia API Integration: Subscribe to Wikipedia's Recent Changes stream and Change Detection API
- Event Processing: Process change events to identify new, updated, or deleted pages
- Priority Queueing: Assign higher priority to recently changed pages over discovery crawling
- Incremental Updates: Update database records incrementally rather than full replacement
- Conflict Resolution: Handle concurrent updates and maintain data consistency
- Fallback Mechanism: Periodic full re-crawl for pages that haven't been updated in specified timeframe
Code
C++
class IncrementalWikipediaCrawler {
private:
struct ChangeEvent {
string pageTitle;
string action; // "edit", "new", "delete"
time_t timestamp;
int revisionId;
int priority;
};
class ChangeDetector {
private:
string apiEndpoint = "https://en.wikipedia.org/w/api.php";
time_t lastCheckTime;
public:
vector<ChangeEvent> getRecentChanges(time_t since) {
vector<ChangeEvent> changes;
// API call to get recent changes
string url = apiEndpoint + "?action=query&list=recentchanges&rcstart="
+ to_string(since) + "&format=json";
auto response = makeApiCall(url);
auto changesJson = parseJson(response);
for (const auto& change : changesJson["query"]["recentchanges"]) {
ChangeEvent event;
event.pageTitle = change["title"];
event.action = change["type"];
event.timestamp = change["timestamp"];
event.revisionId = change["revid"];
event.priority = calculatePriority(event);
changes.push_back(event);
}
return changes;
}
private:
int calculatePriority(const ChangeEvent& event) {
// Higher priority for recent changes and popular pages
int basePriority = 1;
if (event.action == "new") basePriority += 2;
if (time(nullptr) - event.timestamp < 3600) basePriority += 1; // Recent hour
return basePriority;
}
string makeApiCall(const string& url) {
// HTTP implementation
return "";
}
json parseJson(const string& response) {
// JSON parsing implementation
return json{};
}
};
class IncrementalCrawler {
private:
ChangeDetector detector;
priority_queue<ChangeEvent> changeQueue;
unordered_map<string, int> pageVersions;
time_t lastFullCrawl;
public:
void startIncrementalCrawling() {
while (true) {
// Get recent changes
auto changes = detector.getRecentChanges(lastFullCrawl);
for (const auto& change : changes) {
// Check if we need to update this page
if (shouldProcessChange(change)) {
changeQueue.push(change);
}
}
// Process high-priority changes
processChanges();
// Sleep before next check
this_thread::sleep_for(chrono::minutes(5));
}
}
private:
bool shouldProcessChange(const ChangeEvent& change) {
// Skip if we already have this revision
if (pageVersions[change.pageTitle] >= change.revisionId) {
return false;
}
// Always process new pages and recent edits
return true;
}
void processChanges() {
while (!changeQueue.empty()) {
auto change = changeQueue.top();
changeQueue.pop();
if (change.action == "delete") {
removePageFromDatabase(change.pageTitle);
} else {
crawlAndUpdatePage(change.pageTitle, change.revisionId);
}
pageVersions[change.pageTitle] = change.revisionId;
}
}
void crawlAndUpdatePage(const string& pageTitle, int revisionId) {
string url = "https://en.wikipedia.org/wiki/" + urlEncode(pageTitle);
auto content = crawlPage(url);
updateDatabase(pageTitle, content, revisionId);
}
void removePageFromDatabase(const string& pageTitle) {
// Database deletion implementation
}
void updateDatabase(const string& pageTitle, const string& content, int revisionId) {
// Database update implementation
}
string crawlPage(const string& url) {
// Page crawling implementation
return "";
}
string urlEncode(const string& title) {
// URL encoding implementation
return title;
}
};
public:
void runIncrementalCrawl() {
IncrementalCrawler crawler;
crawler.startIncrementalCrawling();
}
};
Go
type ChangeEvent struct {
PageTitle string
Action string // "edit", "new", "delete"
Timestamp time.Time
RevisionID int
Priority int
}
type ChangeDetector struct {
apiEndpoint string
lastCheckTime time.Time
client *http.Client
}
func NewChangeDetector() *ChangeDetector {
return &ChangeDetector{
apiEndpoint: "https://en.wikipedia.org/w/api.php",
client: &http.Client{Timeout: 30 * time.Second},
}
}
func (cd *ChangeDetector) GetRecentChanges(since time.Time) ([]ChangeEvent, error) {
var changes []ChangeEvent
url := fmt.Sprintf("%s?action=query&list=recentchanges&rcstart=%s&format=json",
cd.apiEndpoint, since.Format("2006-01-02T15:04:05Z"))
resp, err := cd.client.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var apiResponse struct {
Query struct {
RecentChanges []struct {
Title string `json:"title"`
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
RevID int `json:"revid"`
} `json:"recentchanges"`
} `json:"query"`
}
if err := json.NewDecoder(resp.Body).Decode(&apiResponse); err != nil {
return nil, err
}
for _, change := range apiResponse.Query.RecentChanges {
event := ChangeEvent{
PageTitle: change.Title,
Action: change.Type,
Timestamp: change.Timestamp,
RevisionID: change.RevID,
Priority: cd.calculatePriority(change.Type, change.Timestamp),
}
changes = append(changes, event)
}
return changes, nil
}
func (cd *ChangeDetector) calculatePriority(action string, timestamp time.Time) int {
basePriority := 1
if action == "new" {
basePriority += 2
}
if time.Since(timestamp) < time.Hour {
basePriority += 1
}
return basePriority
}
type IncrementalCrawler struct {
detector *ChangeDetector
changeQueue chan ChangeEvent
pageVersions map[string]int
lastFullCrawl time.Time
mutex sync.RWMutex
}
func NewIncrementalCrawler() *IncrementalCrawler {
return &IncrementalCrawler{
detector: NewChangeDetector(),
changeQueue: make(chan ChangeEvent, 10000),
pageVersions: make(map[string]int),
lastFullCrawl: time.Now(),
}
}
func (ic *IncrementalCrawler) StartIncrementalCrawling() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
// Start change processor
go ic.processChanges()
for {
select {
case <-ticker.C:
changes, err := ic.detector.GetRecentChanges(ic.lastFullCrawl)
if err != nil {
log.Printf("Error getting recent changes: %v", err)
continue
}
for _, change := range changes {
if ic.shouldProcessChange(change) {
select {
case ic.changeQueue <- change:
default:
log.Printf("Change queue full, dropping change for %s", change.PageTitle)
}
}
}
ic.lastFullCrawl = time.Now()
}
}
}
func (ic *IncrementalCrawler) shouldProcessChange(change ChangeEvent) bool {
ic.mutex.RLock()
currentVersion, exists := ic.pageVersions[change.PageTitle]
ic.mutex.RUnlock()
return !exists || currentVersion < change.RevisionID
}
func (ic *IncrementalCrawler) processChanges() {
for change := range ic.changeQueue {
switch change.Action {
case "delete":
ic.removePageFromDatabase(change.PageTitle)
default:
ic.crawlAndUpdatePage(change.PageTitle, change.RevisionID)
}
ic.mutex.Lock()
ic.pageVersions[change.PageTitle] = change.RevisionID
ic.mutex.Unlock()
}
}
func (ic *IncrementalCrawler) crawlAndUpdatePage(pageTitle string, revisionID int) {
url := "https://en.wikipedia.org/wiki/" + url.QueryEscape(pageTitle)
content, err := ic.crawlPage(url)
if err != nil {
log.Printf("Error crawling page %s: %v", pageTitle, err)
return
}
ic.updateDatabase(pageTitle, content, revisionID)
}
func (ic *IncrementalCrawler) crawlPage(url string) (string, error) {
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(body), nil
}
func (ic *IncrementalCrawler) removePageFromDatabase(pageTitle string) {
// Database deletion implementation
}
func (ic *IncrementalCrawler) updateDatabase(pageTitle, content string, revisionID int) {
// Database update implementation
}
func RunIncrementalCrawl() {
crawler := NewIncrementalCrawler()
crawler.StartIncrementalCrawling()
}
Java
class IncrementalWikipediaCrawler {
static class ChangeEvent {
String pageTitle;
String action; // "edit", "new", "delete"
Instant timestamp;
int revisionId;
int priority;
ChangeEvent(String pageTitle, String action, Instant timestamp, int revisionId, int priority) {
this.pageTitle = pageTitle;
this.action = action;
this.timestamp = timestamp;
this.revisionId = revisionId;
this.priority = priority;
}
}
static class ChangeDetector {
private static final String API_ENDPOINT = "https://en.wikipedia.org/w/api.php";
private final HttpClient client;
private final ObjectMapper mapper;
public ChangeDetector() {
this.client = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(30))
.build();
this.mapper = new ObjectMapper();
}
public List<ChangeEvent> getRecentChanges(Instant since) throws Exception {
List<ChangeEvent> changes = new ArrayList<>();
String url = API_ENDPOINT + "?action=query&list=recentchanges&rcstart=" +
since.toString() + "&format=json";
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.build();
HttpResponse<String> response = client.send(request,
HttpResponse.BodyHandlers.ofString());
JsonNode root = mapper.readTree(response.body());
JsonNode recentChanges = root.path("query").path("recentchanges");
for (JsonNode change : recentChanges) {
String title = change.path("title").asText();
String type = change.path("type").asText();
Instant timestamp = Instant.parse(change.path("timestamp").asText());
int revId = change.path("revid").asInt();
int priority = calculatePriority(type, timestamp);
changes.add(new ChangeEvent(title, type, timestamp, revId, priority));
}
return changes;
}
private int calculatePriority(String action, Instant timestamp) {
int basePriority = 1;
if ("new".equals(action)) {
basePriority += 2;
}
if (Duration.between(timestamp, Instant.now()).toHours() < 1) {
basePriority += 1;
}
return basePriority;
}
}
static class IncrementalCrawler {
private final ChangeDetector detector;
private final BlockingQueue<ChangeEvent> changeQueue;
private final Map<String, Integer> pageVersions;
private Instant lastFullCrawl;
private final ScheduledExecutorService scheduler;
private final ExecutorService processor;
public IncrementalCrawler() {
this.detector = new ChangeDetector();
this.changeQueue = new LinkedBlockingQueue<>(10000);
this.pageVersions = new ConcurrentHashMap<>();
this.lastFullCrawl = Instant.now();
this.scheduler = Executors.newScheduledThreadPool(1);
this.processor = Executors.newFixedThreadPool(5);
}
public void startIncrementalCrawling() {
// Start change processor
processor.submit(this::processChanges);
// Schedule periodic change detection
scheduler.scheduleAtFixedRate(this::detectChanges, 0, 5, TimeUnit.MINUTES);
}
private void detectChanges() {
try {
List<ChangeEvent> changes = detector.getRecentChanges(lastFullCrawl);
for (ChangeEvent change : changes) {
if (shouldProcessChange(change)) {
if (!changeQueue.offer(change)) {
System.err.println("Change queue full, dropping change for " + change.pageTitle);
}
}
}
lastFullCrawl = Instant.now();
} catch (Exception e) {
System.err.println("Error detecting changes: " + e.getMessage());
}
}
private boolean shouldProcessChange(ChangeEvent change) {
Integer currentVersion = pageVersions.get(change.pageTitle);
return currentVersion == null || currentVersion < change.revisionId;
}
private void processChanges() {
while (!Thread.currentThread().isInterrupted()) {
try {
ChangeEvent change = changeQueue.take();
switch (change.action) {
case "delete":
removePageFromDatabase(change.pageTitle);
break;
default:
crawlAndUpdatePage(change.pageTitle, change.revisionId);
break;
}
pageVersions.put(change.pageTitle, change.revisionId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
private void crawlAndUpdatePage(String pageTitle, int revisionId) {
try {
String url = "https://en.wikipedia.org/wiki/" +
URLEncoder.encode(pageTitle, StandardCharsets.UTF_8);
String content = crawlPage(url);
updateDatabase(pageTitle, content, revisionId);
} catch (Exception e) {
System.err.println("Error crawling page " + pageTitle + ": " + e.getMessage());
}
}
private String crawlPage(String url) throws Exception {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.build();
HttpResponse<String> response = HttpClient.newHttpClient()
.send(request, HttpResponse.BodyHandlers.ofString());
return response.body();
}
private void removePageFromDatabase(String pageTitle) {
// Database deletion implementation
}
private void updateDatabase(String pageTitle, String content, int revisionId) {
// Database update implementation
}
}
public void runIncrementalCrawl() {
IncrementalCrawler crawler = new IncrementalCrawler();
crawler.startIncrementalCrawling();
}
}
Python
class IncrementalWikipediaCrawler:
class ChangeEvent:
def __init__(self, page_title: str, action: str, timestamp: datetime, revision_id: int, priority: int):
self.page_title = page_title
self.action = action # "edit", "new", "delete"
self.timestamp = timestamp
self.revision_id = revision_id
self.priority = priority
class ChangeDetector:
def __init__(self):
self.api_endpoint = "https://en.wikipedia.org/w/api.php"
self.session = requests.Session()
def get_recent_changes(self, since: datetime) -> List['IncrementalWikipediaCrawler.ChangeEvent']:
changes = []
params = {
'action': 'query',
'list': 'recentchanges',
'rcstart': since.isoformat(),
'format': 'json',
'rclimit': 500
}
try:
response = self.session.get(self.api_endpoint, params=params, timeout=30)
response.raise_for_status()
data = response.json()
for change in data.get('query', {}).get('recentchanges', []):
title = change.get('title', '')
action = change.get('type', '')
timestamp = datetime.fromisoformat(change.get('timestamp', '').replace('Z', '+00:00'))
rev_id = change.get('revid', 0)
priority = self._calculate_priority(action, timestamp)
event = IncrementalWikipediaCrawler.ChangeEvent(
title, action, timestamp, rev_id, priority
)
changes.append(event)
except requests.RequestException as e:
print(f"Error getting recent changes: {e}")
return changes
def _calculate_priority(self, action: str, timestamp: datetime) -> int:
base_priority = 1
if action == "new":
base_priority += 2
if datetime.now(timezone.utc) - timestamp < timedelta(hours=1):
base_priority += 1
return base_priority
class IncrementalCrawler:
def __init__(self):
self.detector = IncrementalWikipediaCrawler.ChangeDetector()
self.change_queue: queue.PriorityQueue = queue.PriorityQueue(maxsize=10000)
self.page_versions: Dict[str, int] = {}
self.last_full_crawl = datetime.now(timezone.utc)
self.lock = threading.RLock()
self.session = requests.Session()
def start_incremental_crawling(self) -> None:
# Start change processor thread
processor_thread = threading.Thread(target=self._process_changes)
processor_thread.daemon = True
processor_thread.start()
# Main detection loop
while True:
try:
changes = self.detector.get_recent_changes(self.last_full_crawl)
for change in changes:
if self._should_process_change(change):
try:
# Use negative priority for max-heap behavior
self.change_queue.put((-change.priority, change), timeout=1)
except queue.Full:
print(f"Change queue full, dropping change for {change.page_title}")
self.last_full_crawl = datetime.now(timezone.utc)
time.sleep(300) # 5 minutes
except Exception as e:
print(f"Error in change detection loop: {e}")
time.sleep(60) # Wait before retry
def _should_process_change(self, change: 'IncrementalWikipediaCrawler.ChangeEvent') -> bool:
with self.lock:
current_version = self.page_versions.get(change.page_title, 0)
return current_version < change.revision_id
def _process_changes(self) -> None:
while True:
try:
priority, change = self.change_queue.get(timeout=10)
if change.action == "delete":
self._remove_page_from_database(change.page_title)
else:
self._crawl_and_update_page(change.page_title, change.revision_id)
with self.lock:
self.page_versions[change.page_title] = change.revision_id
self.change_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"Error processing change: {e}")
def _crawl_and_update_page(self, page_title: str, revision_id: int) -> None:
try:
url = f"https://en.wikipedia.org/wiki/{requests.utils.quote(page_title)}"
content = self._crawl_page(url)
self._update_database(page_title, content, revision_id)
except Exception as e:
print(f"Error crawling page {page_title}: {e}")
def _crawl_page(self, url: str) -> str:
response = self.session.get(url, timeout=30)
response.raise_for_status()
return response.text
def _remove_page_from_database(self, page_title: str) -> None:
# Database deletion implementation
pass
def _update_database(self, page_title: str, content: str, revision_id: int) -> None:
# Database update implementation
pass
def run_incremental_crawl(self) -> None:
crawler = self.IncrementalCrawler()
crawler.start_incremental_crawling()
Complexity
- ⏰ Time complexity:
O(C + U), where C is the number of changes detected and U is the number of unique page updates. Much more efficient than full re-crawl - 🧺 Space complexity:
O(P + Q)where P is the number of tracked page versions and Q is the change queue size. Significantly reduced compared to full crawl approach
Notes
- Method 1 provides a comprehensive distributed crawling solution suitable for initial complete Wikipedia capture
- Method 2 offers an efficient incremental update system for maintaining synchronization with minimal resource usage
- Both methods handle key challenges: rate limiting, IP blacklisting, fault tolerance, and distributed coordination
- The system can be extended with additional features like content deduplication, language detection, and quality scoring
- Production implementation should include monitoring, alerting, and detailed logging for operational visibility
- Consider using established message queue systems (Apache Kafka, Redis Streams) for better scalability and durability
- Database design should support efficient partial updates and maintain referential integrity across distributed updates
- Legal and ethical considerations include respecting Wikipedia's terms of service and avoiding excessive load on their servers
- Real-world implementation might benefit from using Wikipedia's official dumps and APIs rather than web scraping
- The architecture can be adapted for other large-scale web crawling scenarios beyond Wikipedia