Problem

Design a system to crawl and copy all of Wikipedia using a distributed network of machines.

More specifically, suppose your server has access to a set of client machines. Your client machines can execute code you have written to access Wikipedia pages, download and parse their data, and write the results to a database.

Some questions you may want to consider as part of your solution are:

  • How will you reach as many pages as possible?
  • How can you keep track of pages that have already been visited?
  • How will you deal with your client machines being blacklisted?
  • How can you update your database when Wikipedia pages are added or updated?

Examples

Example 1

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Input: 
- Starting URLs: ["https://en.wikipedia.org/wiki/Main_Page"]
- Client machines: 100
- Target: Complete English Wikipedia crawl
Output:
- Crawled pages: ~6.7 million articles
- Database entries: Article content, metadata, links
- Time: ~24-48 hours with proper rate limiting
Explanation:
Starting from Main Page, discover links to all other pages through BFS traversal.
Use distributed queue system to coordinate work across 100 machines.
Implement rate limiting to avoid being blocked by Wikipedia.

Example 2

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Input:
- Starting URLs: Multiple language Wikipedia main pages
- Client machines: 500
- Target: Multi-language Wikipedia crawl
Output:
- Crawled pages: ~60+ million articles across languages
- Database schema: Language-specific tables with cross-references
- Incremental updates: Daily delta processing
Explanation:
Separate crawling queues per language to handle different update frequencies.
Use language-specific crawlers with appropriate character encoding.
Implement change detection through Wikipedia's API and revision tracking.

Example 3

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Input:
- Scenario: 50% of client machines get IP-banned
- Remaining machines: 50
- Requirement: Continue crawling without interruption
Output:
- Failover time: <5 minutes
- Crawl completion: Delayed but successful
- IP rotation: Automatic proxy switching
Explanation:
Implement IP rotation with proxy pools and VPN services.
Use exponential backoff for banned IPs with automatic retry.
Redistribute work from failed machines to healthy ones.

Example 4

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Input:
- Existing database: 6.5M articles as of yesterday
- New Wikipedia state: 6.502M articles (2K new, 500 updated)
- Update frequency: Every 6 hours
Output:
- Incremental crawl: Only 2.5K pages processed
- Update time: 15 minutes vs 48 hours for full crawl
- Consistency: 99.9% synchronization with Wikipedia
Explanation:
Use Wikipedia's Recent Changes API to identify modified pages.
Compare timestamps and revision IDs to detect updates.
Priority queue for new/updated pages over full re-crawl.

Example 5

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Input:
- Memory constraint: 8GB RAM per machine
- Page size: Average 50KB per article
- Concurrent connections: 100 per machine
Output:
- Memory usage: ~5MB for page buffers + 2GB for URL queue
- Throughput: 10 pages/second per machine
- Queue size: 1M URLs buffered locally per machine
Explanation:
Implement memory-efficient streaming processing.
Use disk-based queues for large URL lists.
Batch database writes to optimize I/O operations.

Solution

Method 1 - Master-Worker Architecture with Distributed Queue

Intuition

A distributed web crawler requires coordination between multiple machines to avoid duplicate work while maximizing coverage. The key insight is to use a master-worker pattern where a central coordinator assigns work to client machines through distributed queues, while maintaining a global visited set to prevent redundant crawling.

Approach

  1. Master Server Setup: Initialize central coordinator with seed URLs and client machine registry
  2. Distributed Queue System: Use message queue (Redis/RabbitMQ) to distribute URLs to worker machines
  3. Visited Set Management: Maintain distributed hash set (Redis) to track crawled pages across all workers
  4. Worker Process: Each client machine polls for URLs, crawls pages, extracts links, and stores results
  5. Rate Limiting: Implement distributed rate limiting to respect Wikipedia’s robots.txt and avoid IP bans
  6. Fault Tolerance: Handle machine failures through heartbeats and work redistribution
  7. Incremental Updates: Use Wikipedia’s API to detect changes and schedule re-crawls

Code

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class DistributedWikipediaCrawler {
private:
    struct CrawlTask {
        string url;
        int priority;
        time_t scheduledTime;
        int retryCount;
    };
    
    struct MachineStatus {
        string machineId;
        string ipAddress;
        time_t lastHeartbeat;
        bool isBlacklisted;
        int crawlRate;
    };
    
    class MasterServer {
    private:
        unordered_set<string> visitedUrls;
        priority_queue<CrawlTask> taskQueue;
        unordered_map<string, MachineStatus> machines;
        unordered_map<string, time_t> rateLimits;
        
    public:
        void initializeCrawl(vector<string>& seedUrls) {
            for (const auto& url : seedUrls) {
                taskQueue.push({url, 1, time(nullptr), 0});
            }
        }
        
        CrawlTask assignTask(const string& machineId) {
            updateMachineHeartbeat(machineId);
            
            if (taskQueue.empty()) {
                return {"", 0, 0, 0};
            }
            
            auto task = taskQueue.top();
            taskQueue.pop();
            
            // Check rate limiting
            string domain = extractDomain(task.url);
            if (rateLimits[domain] > time(nullptr)) {
                // Re-queue with delay
                task.scheduledTime = rateLimits[domain] + 1;
                taskQueue.push(task);
                return assignTask(machineId);
            }
            
            visitedUrls.insert(task.url);
            rateLimits[domain] = time(nullptr) + 1; // 1 second delay
            
            return task;
        }
        
        void submitResults(const string& machineId, const CrawlResult& result) {
            // Store in database
            storePageContent(result);
            
            // Extract and queue new URLs
            for (const auto& link : result.extractedLinks) {
                if (visitedUrls.find(link) == visitedUrls.end()) {
                    taskQueue.push({link, 2, time(nullptr), 0});
                }
            }
        }
        
        void handleMachineFailure(const string& machineId) {
            machines[machineId].isBlacklisted = true;
            // Redistribute pending tasks
            redistributeTasks(machineId);
        }
        
    private:
        string extractDomain(const string& url) {
            size_t start = url.find("://") + 3;
            size_t end = url.find("/", start);
            return url.substr(start, end - start);
        }
        
        void redistributeTasks(const string& failedMachine) {
            // Implementation for task redistribution
        }
        
        void updateMachineHeartbeat(const string& machineId) {
            machines[machineId].lastHeartbeat = time(nullptr);
        }
        
        void storePageContent(const CrawlResult& result) {
            // Database storage implementation
        }
    };
    
    class WorkerMachine {
    private:
        string machineId;
        MasterServer* master;
        vector<string> proxyList;
        int currentProxyIndex;
        
    public:
        void startCrawling() {
            while (true) {
                auto task = master->assignTask(machineId);
                if (task.url.empty()) {
                    this_thread::sleep_for(chrono::seconds(5));
                    continue;
                }
                
                auto result = crawlPage(task.url);
                if (result.success) {
                    master->submitResults(machineId, result);
                } else if (result.isBlacklisted) {
                    rotateProxy();
                    master->handleMachineFailure(machineId);
                }
                
                // Rate limiting
                this_thread::sleep_for(chrono::milliseconds(100));
            }
        }
        
    private:
        CrawlResult crawlPage(const string& url) {
            // HTTP request implementation with proxy support
            CrawlResult result;
            
            try {
                auto response = makeHttpRequest(url);
                result.content = response.body;
                result.extractedLinks = parseLinks(response.body);
                result.success = true;
            } catch (const HttpException& e) {
                if (e.statusCode == 429 || e.statusCode == 403) {
                    result.isBlacklisted = true;
                }
                result.success = false;
            }
            
            return result;
        }
        
        void rotateProxy() {
            currentProxyIndex = (currentProxyIndex + 1) % proxyList.size();
        }
        
        vector<string> parseLinks(const string& html) {
            vector<string> links;
            // HTML parsing implementation
            return links;
        }
    };
    
public:
    void runDistributedCrawl(vector<string>& seedUrls, int numMachines) {
        MasterServer master;
        master.initializeCrawl(seedUrls);
        
        vector<thread> workers;
        for (int i = 0; i < numMachines; i++) {
            workers.emplace_back([&master, i]() {
                WorkerMachine worker;
                worker.startCrawling();
            });
        }
        
        for (auto& worker : workers) {
            worker.join();
        }
    }
};

struct CrawlResult {
    string url;
    string content;
    vector<string> extractedLinks;
    time_t timestamp;
    bool success;
    bool isBlacklisted;
};
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
type CrawlTask struct {
    URL           string
    Priority      int
    ScheduledTime time.Time
    RetryCount    int
}

type MachineStatus struct {
    MachineID     string
    IPAddress     string
    LastHeartbeat time.Time
    IsBlacklisted bool
    CrawlRate     int
}

type CrawlResult struct {
    URL            string
    Content        string
    ExtractedLinks []string
    Timestamp      time.Time
    Success        bool
    IsBlacklisted  bool
}

type MasterServer struct {
    visitedUrls map[string]bool
    taskQueue   chan CrawlTask
    machines    map[string]*MachineStatus
    rateLimits  map[string]time.Time
    mutex       sync.RWMutex
}

func NewMasterServer() *MasterServer {
    return &MasterServer{
        visitedUrls: make(map[string]bool),
        taskQueue:   make(chan CrawlTask, 1000000),
        machines:    make(map[string]*MachineStatus),
        rateLimits:  make(map[string]time.Time),
    }
}

func (ms *MasterServer) InitializeCrawl(seedUrls []string) {
    for _, url := range seedUrls {
        ms.taskQueue <- CrawlTask{
            URL:           url,
            Priority:      1,
            ScheduledTime: time.Now(),
            RetryCount:    0,
        }
    }
}

func (ms *MasterServer) AssignTask(machineID string) (CrawlTask, error) {
    ms.updateMachineHeartbeat(machineID)
    
    select {
    case task := <-ms.taskQueue:
        domain := ms.extractDomain(task.URL)
        
        ms.mutex.Lock()
        if rateLimit, exists := ms.rateLimits[domain]; exists && time.Now().Before(rateLimit) {
            // Re-queue with delay
            task.ScheduledTime = rateLimit.Add(time.Second)
            ms.taskQueue <- task
            ms.mutex.Unlock()
            return ms.AssignTask(machineID)
        }
        
        ms.visitedUrls[task.URL] = true
        ms.rateLimits[domain] = time.Now().Add(time.Second)
        ms.mutex.Unlock()
        
        return task, nil
    case <-time.After(5 * time.Second):
        return CrawlTask{}, fmt.Errorf("no tasks available")
    }
}

func (ms *MasterServer) SubmitResults(machineID string, result CrawlResult) {
    // Store in database
    ms.storePageContent(result)
    
    // Extract and queue new URLs
    for _, link := range result.ExtractedLinks {
        ms.mutex.RLock()
        if !ms.visitedUrls[link] {
            ms.taskQueue <- CrawlTask{
                URL:           link,
                Priority:      2,
                ScheduledTime: time.Now(),
                RetryCount:    0,
            }
        }
        ms.mutex.RUnlock()
    }
}

func (ms *MasterServer) HandleMachineFailure(machineID string) {
    ms.mutex.Lock()
    if machine, exists := ms.machines[machineID]; exists {
        machine.IsBlacklisted = true
    }
    ms.mutex.Unlock()
    
    // Redistribute pending tasks
    ms.redistributeTasks(machineID)
}

func (ms *MasterServer) extractDomain(url string) string {
    if idx := strings.Index(url, "://"); idx != -1 {
        url = url[idx+3:]
    }
    if idx := strings.Index(url, "/"); idx != -1 {
        url = url[:idx]
    }
    return url
}

func (ms *MasterServer) updateMachineHeartbeat(machineID string) {
    ms.mutex.Lock()
    if machine, exists := ms.machines[machineID]; exists {
        machine.LastHeartbeat = time.Now()
    } else {
        ms.machines[machineID] = &MachineStatus{
            MachineID:     machineID,
            LastHeartbeat: time.Now(),
            IsBlacklisted: false,
        }
    }
    ms.mutex.Unlock()
}

func (ms *MasterServer) storePageContent(result CrawlResult) {
    // Database storage implementation
}

func (ms *MasterServer) redistributeTasks(failedMachine string) {
    // Implementation for task redistribution
}

type WorkerMachine struct {
    machineID         string
    master           *MasterServer
    proxyList        []string
    currentProxyIndex int
    client           *http.Client
}

func NewWorkerMachine(machineID string, master *MasterServer) *WorkerMachine {
    return &WorkerMachine{
        machineID: machineID,
        master:    master,
        proxyList: []string{"proxy1.com", "proxy2.com", "proxy3.com"},
        client:    &http.Client{Timeout: 30 * time.Second},
    }
}

func (wm *WorkerMachine) StartCrawling() {
    for {
        task, err := wm.master.AssignTask(wm.machineID)
        if err != nil {
            time.Sleep(5 * time.Second)
            continue
        }
        
        result := wm.crawlPage(task.URL)
        if result.Success {
            wm.master.SubmitResults(wm.machineID, result)
        } else if result.IsBlacklisted {
            wm.rotateProxy()
            wm.master.HandleMachineFailure(wm.machineID)
        }
        
        // Rate limiting
        time.Sleep(100 * time.Millisecond)
    }
}

func (wm *WorkerMachine) crawlPage(url string) CrawlResult {
    result := CrawlResult{
        URL:       url,
        Timestamp: time.Now(),
    }
    
    resp, err := wm.client.Get(url)
    if err != nil {
        result.Success = false
        return result
    }
    defer resp.Body.Close()
    
    if resp.StatusCode == 429 || resp.StatusCode == 403 {
        result.IsBlacklisted = true
        result.Success = false
        return result
    }
    
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        result.Success = false
        return result
    }
    
    result.Content = string(body)
    result.ExtractedLinks = wm.parseLinks(result.Content)
    result.Success = true
    
    return result
}

func (wm *WorkerMachine) rotateProxy() {
    wm.currentProxyIndex = (wm.currentProxyIndex + 1) % len(wm.proxyList)
    // Update HTTP client with new proxy
}

func (wm *WorkerMachine) parseLinks(html string) []string {
    var links []string
    // HTML parsing implementation using regex or parser
    re := regexp.MustCompile(`href="(/wiki/[^"]*)"`)
    matches := re.FindAllStringSubmatch(html, -1)
    
    for _, match := range matches {
        if len(match) > 1 {
            links = append(links, "https://en.wikipedia.org"+match[1])
        }
    }
    
    return links
}

func RunDistributedCrawl(seedUrls []string, numMachines int) {
    master := NewMasterServer()
    master.InitializeCrawl(seedUrls)
    
    var wg sync.WaitGroup
    for i := 0; i < numMachines; i++ {
        wg.Add(1)
        go func(machineID string) {
            defer wg.Done()
            worker := NewWorkerMachine(machineID, master)
            worker.StartCrawling()
        }(fmt.Sprintf("machine-%d", i))
    }
    
    wg.Wait()
}
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
class DistributedWikipediaCrawler {
    
    static class CrawlTask {
        String url;
        int priority;
        long scheduledTime;
        int retryCount;
        
        CrawlTask(String url, int priority, long scheduledTime, int retryCount) {
            this.url = url;
            this.priority = priority;
            this.scheduledTime = scheduledTime;
            this.retryCount = retryCount;
        }
    }
    
    static class MachineStatus {
        String machineId;
        String ipAddress;
        long lastHeartbeat;
        boolean isBlacklisted;
        int crawlRate;
        
        MachineStatus(String machineId, String ipAddress) {
            this.machineId = machineId;
            this.ipAddress = ipAddress;
            this.lastHeartbeat = System.currentTimeMillis();
            this.isBlacklisted = false;
            this.crawlRate = 10;
        }
    }
    
    static class CrawlResult {
        String url;
        String content;
        List<String> extractedLinks;
        long timestamp;
        boolean success;
        boolean isBlacklisted;
        
        CrawlResult(String url) {
            this.url = url;
            this.extractedLinks = new ArrayList<>();
            this.timestamp = System.currentTimeMillis();
            this.success = false;
            this.isBlacklisted = false;
        }
    }
    
    static class MasterServer {
        private final Set<String> visitedUrls = ConcurrentHashMap.newKeySet();
        private final BlockingQueue<CrawlTask> taskQueue = new LinkedBlockingQueue<>();
        private final Map<String, MachineStatus> machines = new ConcurrentHashMap<>();
        private final Map<String, Long> rateLimits = new ConcurrentHashMap<>();
        private final ExecutorService executor = Executors.newCachedThreadPool();
        
        public void initializeCrawl(List<String> seedUrls) {
            for (String url : seedUrls) {
                taskQueue.offer(new CrawlTask(url, 1, System.currentTimeMillis(), 0));
            }
        }
        
        public CrawlTask assignTask(String machineId) throws InterruptedException {
            updateMachineHeartbeat(machineId);
            
            CrawlTask task = taskQueue.take();
            String domain = extractDomain(task.url);
            
            Long rateLimit = rateLimits.get(domain);
            if (rateLimit != null && System.currentTimeMillis() < rateLimit) {
                task.scheduledTime = rateLimit + 1000;
                taskQueue.offer(task);
                return assignTask(machineId);
            }
            
            visitedUrls.add(task.url);
            rateLimits.put(domain, System.currentTimeMillis() + 1000);
            
            return task;
        }
        
        public void submitResults(String machineId, CrawlResult result) {
            storePageContent(result);
            
            for (String link : result.extractedLinks) {
                if (!visitedUrls.contains(link)) {
                    taskQueue.offer(new CrawlTask(link, 2, System.currentTimeMillis(), 0));
                }
            }
        }
        
        public void handleMachineFailure(String machineId) {
            MachineStatus machine = machines.get(machineId);
            if (machine != null) {
                machine.isBlacklisted = true;
            }
            redistributeTasks(machineId);
        }
        
        private String extractDomain(String url) {
            try {
                URL u = new URL(url);
                return u.getHost();
            } catch (Exception e) {
                return "unknown";
            }
        }
        
        private void updateMachineHeartbeat(String machineId) {
            machines.computeIfAbsent(machineId, id -> new MachineStatus(id, "unknown"))
                    .lastHeartbeat = System.currentTimeMillis();
        }
        
        private void storePageContent(CrawlResult result) {
            // Database storage implementation
        }
        
        private void redistributeTasks(String failedMachine) {
            // Implementation for task redistribution
        }
    }
    
    static class WorkerMachine {
        private final String machineId;
        private final MasterServer master;
        private final List<String> proxyList;
        private int currentProxyIndex;
        private final HttpClient client;
        
        public WorkerMachine(String machineId, MasterServer master) {
            this.machineId = machineId;
            this.master = master;
            this.proxyList = Arrays.asList("proxy1.com", "proxy2.com", "proxy3.com");
            this.currentProxyIndex = 0;
            this.client = HttpClient.newBuilder()
                    .connectTimeout(Duration.ofSeconds(30))
                    .build();
        }
        
        public void startCrawling() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    CrawlTask task = master.assignTask(machineId);
                    CrawlResult result = crawlPage(task.url);
                    
                    if (result.success) {
                        master.submitResults(machineId, result);
                    } else if (result.isBlacklisted) {
                        rotateProxy();
                        master.handleMachineFailure(machineId);
                    }
                    
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
        
        private CrawlResult crawlPage(String url) {
            CrawlResult result = new CrawlResult(url);
            
            try {
                HttpRequest request = HttpRequest.newBuilder()
                        .uri(URI.create(url))
                        .timeout(Duration.ofSeconds(30))
                        .build();
                
                HttpResponse<String> response = client.send(request, 
                        HttpResponse.BodyHandlers.ofString());
                
                if (response.statusCode() == 429 || response.statusCode() == 403) {
                    result.isBlacklisted = true;
                    return result;
                }
                
                result.content = response.body();
                result.extractedLinks = parseLinks(result.content);
                result.success = true;
                
            } catch (Exception e) {
                result.success = false;
            }
            
            return result;
        }
        
        private void rotateProxy() {
            currentProxyIndex = (currentProxyIndex + 1) % proxyList.size();
        }
        
        private List<String> parseLinks(String html) {
            List<String> links = new ArrayList<>();
            Pattern pattern = Pattern.compile("href=\"(/wiki/[^\"]*)\"");
            Matcher matcher = pattern.matcher(html);
            
            while (matcher.find()) {
                links.add("https://en.wikipedia.org" + matcher.group(1));
            }
            
            return links;
        }
    }
    
    public void runDistributedCrawl(List<String> seedUrls, int numMachines) {
        MasterServer master = new MasterServer();
        master.initializeCrawl(seedUrls);
        
        ExecutorService workers = Executors.newFixedThreadPool(numMachines);
        
        for (int i = 0; i < numMachines; i++) {
            final String machineId = "machine-" + i;
            workers.submit(() -> {
                WorkerMachine worker = new WorkerMachine(machineId, master);
                worker.startCrawling();
            });
        }
    }
}
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class DistributedWikipediaCrawler:
    
    class CrawlTask:
        def __init__(self, url: str, priority: int, scheduled_time: float, retry_count: int):
            self.url = url
            self.priority = priority
            self.scheduled_time = scheduled_time
            self.retry_count = retry_count
    
    class MachineStatus:
        def __init__(self, machine_id: str, ip_address: str):
            self.machine_id = machine_id
            self.ip_address = ip_address
            self.last_heartbeat = time.time()
            self.is_blacklisted = False
            self.crawl_rate = 10
    
    class CrawlResult:
        def __init__(self, url: str):
            self.url = url
            self.content = ""
            self.extracted_links: List[str] = []
            self.timestamp = time.time()
            self.success = False
            self.is_blacklisted = False
    
    class MasterServer:
        def __init__(self):
            self.visited_urls: Set[str] = set()
            self.task_queue: queue.Queue = queue.Queue()
            self.machines: Dict[str, 'DistributedWikipediaCrawler.MachineStatus'] = {}
            self.rate_limits: Dict[str, float] = {}
            self.lock = threading.RLock()
        
        def initialize_crawl(self, seed_urls: List[str]) -> None:
            for url in seed_urls:
                task = DistributedWikipediaCrawler.CrawlTask(url, 1, time.time(), 0)
                self.task_queue.put(task)
        
        def assign_task(self, machine_id: str) -> Optional['DistributedWikipediaCrawler.CrawlTask']:
            self._update_machine_heartbeat(machine_id)
            
            try:
                task = self.task_queue.get(timeout=5)
                domain = self._extract_domain(task.url)
                
                with self.lock:
                    if domain in self.rate_limits and time.time() < self.rate_limits[domain]:
                        task.scheduled_time = self.rate_limits[domain] + 1
                        self.task_queue.put(task)
                        return self.assign_task(machine_id)
                    
                    self.visited_urls.add(task.url)
                    self.rate_limits[domain] = time.time() + 1
                
                return task
            except queue.Empty:
                return None
        
        def submit_results(self, machine_id: str, result: 'DistributedWikipediaCrawler.CrawlResult') -> None:
            self._store_page_content(result)
            
            for link in result.extracted_links:
                with self.lock:
                    if link not in self.visited_urls:
                        task = DistributedWikipediaCrawler.CrawlTask(link, 2, time.time(), 0)
                        self.task_queue.put(task)
        
        def handle_machine_failure(self, machine_id: str) -> None:
            with self.lock:
                if machine_id in self.machines:
                    self.machines[machine_id].is_blacklisted = True
            
            self._redistribute_tasks(machine_id)
        
        def _extract_domain(self, url: str) -> str:
            try:
                from urllib.parse import urlparse
                parsed = urlparse(url)
                return parsed.netloc
            except:
                return "unknown"
        
        def _update_machine_heartbeat(self, machine_id: str) -> None:
            with self.lock:
                if machine_id not in self.machines:
                    self.machines[machine_id] = DistributedWikipediaCrawler.MachineStatus(machine_id, "unknown")
                self.machines[machine_id].last_heartbeat = time.time()
        
        def _store_page_content(self, result: 'DistributedWikipediaCrawler.CrawlResult') -> None:
            # Database storage implementation
            pass
        
        def _redistribute_tasks(self, failed_machine: str) -> None:
            # Implementation for task redistribution
            pass
    
    class WorkerMachine:
        def __init__(self, machine_id: str, master: 'DistributedWikipediaCrawler.MasterServer'):
            self.machine_id = machine_id
            self.master = master
            self.proxy_list = ["proxy1.com", "proxy2.com", "proxy3.com"]
            self.current_proxy_index = 0
            self.session = requests.Session()
            self.session.headers.update({
                'User-Agent': 'WikipediaCrawler/1.0 (Educational Purpose)'
            })
        
        def start_crawling(self) -> None:
            while True:
                task = self.master.assign_task(self.machine_id)
                if not task:
                    time.sleep(5)
                    continue
                
                result = self._crawl_page(task.url)
                if result.success:
                    self.master.submit_results(self.machine_id, result)
                elif result.is_blacklisted:
                    self._rotate_proxy()
                    self.master.handle_machine_failure(self.machine_id)
                
                time.sleep(0.1)  # Rate limiting
        
        def _crawl_page(self, url: str) -> 'DistributedWikipediaCrawler.CrawlResult':
            result = DistributedWikipediaCrawler.CrawlResult(url)
            
            try:
                response = self.session.get(url, timeout=30)
                
                if response.status_code in [429, 403]:
                    result.is_blacklisted = True
                    return result
                
                response.raise_for_status()
                result.content = response.text
                result.extracted_links = self._parse_links(result.content)
                result.success = True
                
            except requests.RequestException:
                result.success = False
            
            return result
        
        def _rotate_proxy(self) -> None:
            self.current_proxy_index = (self.current_proxy_index + 1) % len(self.proxy_list)
            proxy = self.proxy_list[self.current_proxy_index]
            self.session.proxies.update({
                'http': f'http://{proxy}',
                'https': f'https://{proxy}'
            })
        
        def _parse_links(self, html: str) -> List[str]:
            import re
            links = []
            pattern = re.compile(r'href="(/wiki/[^"]*)"')
            matches = pattern.findall(html)
            
            for match in matches:
                links.append(f"https://en.wikipedia.org{match}")
            
            return links
    
    def run_distributed_crawl(self, seed_urls: List[str], num_machines: int) -> None:
        master = self.MasterServer()
        master.initialize_crawl(seed_urls)
        
        threads = []
        for i in range(num_machines):
            machine_id = f"machine-{i}"
            worker = self.WorkerMachine(machine_id, master)
            thread = threading.Thread(target=worker.start_crawling)
            thread.daemon = True
            thread.start()
            threads.append(thread)
        
        for thread in threads:
            thread.join()

Complexity

  • ⏰ Time complexity: O(P + E), where P is the number of pages and E is the number of edges (links) between pages. Each page is visited once and each link is processed once
  • 🧺 Space complexity: O(P + E) for storing the visited set, task queue, and extracted links. Distributed across multiple machines reduces per-machine memory usage

Method 2 - Event-Driven Architecture with Change Detection

Intuition

Instead of periodically re-crawling all pages, we can use Wikipedia’s real-time change feeds and API to detect updates and only crawl modified content. This approach significantly reduces bandwidth and processing requirements while maintaining up-to-date data.

Approach

  1. Initial Seed Crawl: Perform complete crawl using Method 1 to establish baseline
  2. Wikipedia API Integration: Subscribe to Wikipedia’s Recent Changes stream and Change Detection API
  3. Event Processing: Process change events to identify new, updated, or deleted pages
  4. Priority Queueing: Assign higher priority to recently changed pages over discovery crawling
  5. Incremental Updates: Update database records incrementally rather than full replacement
  6. Conflict Resolution: Handle concurrent updates and maintain data consistency
  7. Fallback Mechanism: Periodic full re-crawl for pages that haven’t been updated in specified timeframe

Code

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class IncrementalWikipediaCrawler {
private:
    struct ChangeEvent {
        string pageTitle;
        string action; // "edit", "new", "delete"
        time_t timestamp;
        int revisionId;
        int priority;
    };
    
    class ChangeDetector {
    private:
        string apiEndpoint = "https://en.wikipedia.org/w/api.php";
        time_t lastCheckTime;
        
    public:
        vector<ChangeEvent> getRecentChanges(time_t since) {
            vector<ChangeEvent> changes;
            
            // API call to get recent changes
            string url = apiEndpoint + "?action=query&list=recentchanges&rcstart=" 
                        + to_string(since) + "&format=json";
            
            auto response = makeApiCall(url);
            auto changesJson = parseJson(response);
            
            for (const auto& change : changesJson["query"]["recentchanges"]) {
                ChangeEvent event;
                event.pageTitle = change["title"];
                event.action = change["type"];
                event.timestamp = change["timestamp"];
                event.revisionId = change["revid"];
                event.priority = calculatePriority(event);
                changes.push_back(event);
            }
            
            return changes;
        }
        
    private:
        int calculatePriority(const ChangeEvent& event) {
            // Higher priority for recent changes and popular pages
            int basePriority = 1;
            if (event.action == "new") basePriority += 2;
            if (time(nullptr) - event.timestamp < 3600) basePriority += 1; // Recent hour
            return basePriority;
        }
        
        string makeApiCall(const string& url) {
            // HTTP implementation
            return "";
        }
        
        json parseJson(const string& response) {
            // JSON parsing implementation
            return json{};
        }
    };
    
    class IncrementalCrawler {
    private:
        ChangeDetector detector;
        priority_queue<ChangeEvent> changeQueue;
        unordered_map<string, int> pageVersions;
        time_t lastFullCrawl;
        
    public:
        void startIncrementalCrawling() {
            while (true) {
                // Get recent changes
                auto changes = detector.getRecentChanges(lastFullCrawl);
                
                for (const auto& change : changes) {
                    // Check if we need to update this page
                    if (shouldProcessChange(change)) {
                        changeQueue.push(change);
                    }
                }
                
                // Process high-priority changes
                processChanges();
                
                // Sleep before next check
                this_thread::sleep_for(chrono::minutes(5));
            }
        }
        
    private:
        bool shouldProcessChange(const ChangeEvent& change) {
            // Skip if we already have this revision
            if (pageVersions[change.pageTitle] >= change.revisionId) {
                return false;
            }
            
            // Always process new pages and recent edits
            return true;
        }
        
        void processChanges() {
            while (!changeQueue.empty()) {
                auto change = changeQueue.top();
                changeQueue.pop();
                
                if (change.action == "delete") {
                    removePageFromDatabase(change.pageTitle);
                } else {
                    crawlAndUpdatePage(change.pageTitle, change.revisionId);
                }
                
                pageVersions[change.pageTitle] = change.revisionId;
            }
        }
        
        void crawlAndUpdatePage(const string& pageTitle, int revisionId) {
            string url = "https://en.wikipedia.org/wiki/" + urlEncode(pageTitle);
            auto content = crawlPage(url);
            updateDatabase(pageTitle, content, revisionId);
        }
        
        void removePageFromDatabase(const string& pageTitle) {
            // Database deletion implementation
        }
        
        void updateDatabase(const string& pageTitle, const string& content, int revisionId) {
            // Database update implementation
        }
        
        string crawlPage(const string& url) {
            // Page crawling implementation
            return "";
        }
        
        string urlEncode(const string& title) {
            // URL encoding implementation
            return title;
        }
    };
    
public:
    void runIncrementalCrawl() {
        IncrementalCrawler crawler;
        crawler.startIncrementalCrawling();
    }
};
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
type ChangeEvent struct {
    PageTitle  string
    Action     string // "edit", "new", "delete"
    Timestamp  time.Time
    RevisionID int
    Priority   int
}

type ChangeDetector struct {
    apiEndpoint   string
    lastCheckTime time.Time
    client        *http.Client
}

func NewChangeDetector() *ChangeDetector {
    return &ChangeDetector{
        apiEndpoint: "https://en.wikipedia.org/w/api.php",
        client:      &http.Client{Timeout: 30 * time.Second},
    }
}

func (cd *ChangeDetector) GetRecentChanges(since time.Time) ([]ChangeEvent, error) {
    var changes []ChangeEvent
    
    url := fmt.Sprintf("%s?action=query&list=recentchanges&rcstart=%s&format=json",
        cd.apiEndpoint, since.Format("2006-01-02T15:04:05Z"))
    
    resp, err := cd.client.Get(url)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    
    var apiResponse struct {
        Query struct {
            RecentChanges []struct {
                Title     string    `json:"title"`
                Type      string    `json:"type"`
                Timestamp time.Time `json:"timestamp"`
                RevID     int       `json:"revid"`
            } `json:"recentchanges"`
        } `json:"query"`
    }
    
    if err := json.NewDecoder(resp.Body).Decode(&apiResponse); err != nil {
        return nil, err
    }
    
    for _, change := range apiResponse.Query.RecentChanges {
        event := ChangeEvent{
            PageTitle:  change.Title,
            Action:     change.Type,
            Timestamp:  change.Timestamp,
            RevisionID: change.RevID,
            Priority:   cd.calculatePriority(change.Type, change.Timestamp),
        }
        changes = append(changes, event)
    }
    
    return changes, nil
}

func (cd *ChangeDetector) calculatePriority(action string, timestamp time.Time) int {
    basePriority := 1
    if action == "new" {
        basePriority += 2
    }
    if time.Since(timestamp) < time.Hour {
        basePriority += 1
    }
    return basePriority
}

type IncrementalCrawler struct {
    detector     *ChangeDetector
    changeQueue  chan ChangeEvent
    pageVersions map[string]int
    lastFullCrawl time.Time
    mutex        sync.RWMutex
}

func NewIncrementalCrawler() *IncrementalCrawler {
    return &IncrementalCrawler{
        detector:     NewChangeDetector(),
        changeQueue:  make(chan ChangeEvent, 10000),
        pageVersions: make(map[string]int),
        lastFullCrawl: time.Now(),
    }
}

func (ic *IncrementalCrawler) StartIncrementalCrawling() {
    ticker := time.NewTicker(5 * time.Minute)
    defer ticker.Stop()
    
    // Start change processor
    go ic.processChanges()
    
    for {
        select {
        case <-ticker.C:
            changes, err := ic.detector.GetRecentChanges(ic.lastFullCrawl)
            if err != nil {
                log.Printf("Error getting recent changes: %v", err)
                continue
            }
            
            for _, change := range changes {
                if ic.shouldProcessChange(change) {
                    select {
                    case ic.changeQueue <- change:
                    default:
                        log.Printf("Change queue full, dropping change for %s", change.PageTitle)
                    }
                }
            }
            
            ic.lastFullCrawl = time.Now()
        }
    }
}

func (ic *IncrementalCrawler) shouldProcessChange(change ChangeEvent) bool {
    ic.mutex.RLock()
    currentVersion, exists := ic.pageVersions[change.PageTitle]
    ic.mutex.RUnlock()
    
    return !exists || currentVersion < change.RevisionID
}

func (ic *IncrementalCrawler) processChanges() {
    for change := range ic.changeQueue {
        switch change.Action {
        case "delete":
            ic.removePageFromDatabase(change.PageTitle)
        default:
            ic.crawlAndUpdatePage(change.PageTitle, change.RevisionID)
        }
        
        ic.mutex.Lock()
        ic.pageVersions[change.PageTitle] = change.RevisionID
        ic.mutex.Unlock()
    }
}

func (ic *IncrementalCrawler) crawlAndUpdatePage(pageTitle string, revisionID int) {
    url := "https://en.wikipedia.org/wiki/" + url.QueryEscape(pageTitle)
    content, err := ic.crawlPage(url)
    if err != nil {
        log.Printf("Error crawling page %s: %v", pageTitle, err)
        return
    }
    
    ic.updateDatabase(pageTitle, content, revisionID)
}

func (ic *IncrementalCrawler) crawlPage(url string) (string, error) {
    resp, err := http.Get(url)
    if err != nil {
        return "", err
    }
    defer resp.Body.Close()
    
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return "", err
    }
    
    return string(body), nil
}

func (ic *IncrementalCrawler) removePageFromDatabase(pageTitle string) {
    // Database deletion implementation
}

func (ic *IncrementalCrawler) updateDatabase(pageTitle, content string, revisionID int) {
    // Database update implementation
}

func RunIncrementalCrawl() {
    crawler := NewIncrementalCrawler()
    crawler.StartIncrementalCrawling()
}
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class IncrementalWikipediaCrawler {
    
    static class ChangeEvent {
        String pageTitle;
        String action; // "edit", "new", "delete"
        Instant timestamp;
        int revisionId;
        int priority;
        
        ChangeEvent(String pageTitle, String action, Instant timestamp, int revisionId, int priority) {
            this.pageTitle = pageTitle;
            this.action = action;
            this.timestamp = timestamp;
            this.revisionId = revisionId;
            this.priority = priority;
        }
    }
    
    static class ChangeDetector {
        private static final String API_ENDPOINT = "https://en.wikipedia.org/w/api.php";
        private final HttpClient client;
        private final ObjectMapper mapper;
        
        public ChangeDetector() {
            this.client = HttpClient.newBuilder()
                    .connectTimeout(Duration.ofSeconds(30))
                    .build();
            this.mapper = new ObjectMapper();
        }
        
        public List<ChangeEvent> getRecentChanges(Instant since) throws Exception {
            List<ChangeEvent> changes = new ArrayList<>();
            
            String url = API_ENDPOINT + "?action=query&list=recentchanges&rcstart=" +
                    since.toString() + "&format=json";
            
            HttpRequest request = HttpRequest.newBuilder()
                    .uri(URI.create(url))
                    .build();
            
            HttpResponse<String> response = client.send(request, 
                    HttpResponse.BodyHandlers.ofString());
            
            JsonNode root = mapper.readTree(response.body());
            JsonNode recentChanges = root.path("query").path("recentchanges");
            
            for (JsonNode change : recentChanges) {
                String title = change.path("title").asText();
                String type = change.path("type").asText();
                Instant timestamp = Instant.parse(change.path("timestamp").asText());
                int revId = change.path("revid").asInt();
                int priority = calculatePriority(type, timestamp);
                
                changes.add(new ChangeEvent(title, type, timestamp, revId, priority));
            }
            
            return changes;
        }
        
        private int calculatePriority(String action, Instant timestamp) {
            int basePriority = 1;
            if ("new".equals(action)) {
                basePriority += 2;
            }
            if (Duration.between(timestamp, Instant.now()).toHours() < 1) {
                basePriority += 1;
            }
            return basePriority;
        }
    }
    
    static class IncrementalCrawler {
        private final ChangeDetector detector;
        private final BlockingQueue<ChangeEvent> changeQueue;
        private final Map<String, Integer> pageVersions;
        private Instant lastFullCrawl;
        private final ScheduledExecutorService scheduler;
        private final ExecutorService processor;
        
        public IncrementalCrawler() {
            this.detector = new ChangeDetector();
            this.changeQueue = new LinkedBlockingQueue<>(10000);
            this.pageVersions = new ConcurrentHashMap<>();
            this.lastFullCrawl = Instant.now();
            this.scheduler = Executors.newScheduledThreadPool(1);
            this.processor = Executors.newFixedThreadPool(5);
        }
        
        public void startIncrementalCrawling() {
            // Start change processor
            processor.submit(this::processChanges);
            
            // Schedule periodic change detection
            scheduler.scheduleAtFixedRate(this::detectChanges, 0, 5, TimeUnit.MINUTES);
        }
        
        private void detectChanges() {
            try {
                List<ChangeEvent> changes = detector.getRecentChanges(lastFullCrawl);
                
                for (ChangeEvent change : changes) {
                    if (shouldProcessChange(change)) {
                        if (!changeQueue.offer(change)) {
                            System.err.println("Change queue full, dropping change for " + change.pageTitle);
                        }
                    }
                }
                
                lastFullCrawl = Instant.now();
            } catch (Exception e) {
                System.err.println("Error detecting changes: " + e.getMessage());
            }
        }
        
        private boolean shouldProcessChange(ChangeEvent change) {
            Integer currentVersion = pageVersions.get(change.pageTitle);
            return currentVersion == null || currentVersion < change.revisionId;
        }
        
        private void processChanges() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    ChangeEvent change = changeQueue.take();
                    
                    switch (change.action) {
                        case "delete":
                            removePageFromDatabase(change.pageTitle);
                            break;
                        default:
                            crawlAndUpdatePage(change.pageTitle, change.revisionId);
                            break;
                    }
                    
                    pageVersions.put(change.pageTitle, change.revisionId);
                    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
        
        private void crawlAndUpdatePage(String pageTitle, int revisionId) {
            try {
                String url = "https://en.wikipedia.org/wiki/" + 
                        URLEncoder.encode(pageTitle, StandardCharsets.UTF_8);
                String content = crawlPage(url);
                updateDatabase(pageTitle, content, revisionId);
            } catch (Exception e) {
                System.err.println("Error crawling page " + pageTitle + ": " + e.getMessage());
            }
        }
        
        private String crawlPage(String url) throws Exception {
            HttpRequest request = HttpRequest.newBuilder()
                    .uri(URI.create(url))
                    .build();
            
            HttpResponse<String> response = HttpClient.newHttpClient()
                    .send(request, HttpResponse.BodyHandlers.ofString());
            
            return response.body();
        }
        
        private void removePageFromDatabase(String pageTitle) {
            // Database deletion implementation
        }
        
        private void updateDatabase(String pageTitle, String content, int revisionId) {
            // Database update implementation
        }
    }
    
    public void runIncrementalCrawl() {
        IncrementalCrawler crawler = new IncrementalCrawler();
        crawler.startIncrementalCrawling();
    }
}
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class IncrementalWikipediaCrawler:
    
    class ChangeEvent:
        def __init__(self, page_title: str, action: str, timestamp: datetime, revision_id: int, priority: int):
            self.page_title = page_title
            self.action = action  # "edit", "new", "delete"
            self.timestamp = timestamp
            self.revision_id = revision_id
            self.priority = priority
    
    class ChangeDetector:
        def __init__(self):
            self.api_endpoint = "https://en.wikipedia.org/w/api.php"
            self.session = requests.Session()
        
        def get_recent_changes(self, since: datetime) -> List['IncrementalWikipediaCrawler.ChangeEvent']:
            changes = []
            
            params = {
                'action': 'query',
                'list': 'recentchanges',
                'rcstart': since.isoformat(),
                'format': 'json',
                'rclimit': 500
            }
            
            try:
                response = self.session.get(self.api_endpoint, params=params, timeout=30)
                response.raise_for_status()
                data = response.json()
                
                for change in data.get('query', {}).get('recentchanges', []):
                    title = change.get('title', '')
                    action = change.get('type', '')
                    timestamp = datetime.fromisoformat(change.get('timestamp', '').replace('Z', '+00:00'))
                    rev_id = change.get('revid', 0)
                    priority = self._calculate_priority(action, timestamp)
                    
                    event = IncrementalWikipediaCrawler.ChangeEvent(
                        title, action, timestamp, rev_id, priority
                    )
                    changes.append(event)
            
            except requests.RequestException as e:
                print(f"Error getting recent changes: {e}")
            
            return changes
        
        def _calculate_priority(self, action: str, timestamp: datetime) -> int:
            base_priority = 1
            if action == "new":
                base_priority += 2
            if datetime.now(timezone.utc) - timestamp < timedelta(hours=1):
                base_priority += 1
            return base_priority
    
    class IncrementalCrawler:
        def __init__(self):
            self.detector = IncrementalWikipediaCrawler.ChangeDetector()
            self.change_queue: queue.PriorityQueue = queue.PriorityQueue(maxsize=10000)
            self.page_versions: Dict[str, int] = {}
            self.last_full_crawl = datetime.now(timezone.utc)
            self.lock = threading.RLock()
            self.session = requests.Session()
        
        def start_incremental_crawling(self) -> None:
            # Start change processor thread
            processor_thread = threading.Thread(target=self._process_changes)
            processor_thread.daemon = True
            processor_thread.start()
            
            # Main detection loop
            while True:
                try:
                    changes = self.detector.get_recent_changes(self.last_full_crawl)
                    
                    for change in changes:
                        if self._should_process_change(change):
                            try:
                                # Use negative priority for max-heap behavior
                                self.change_queue.put((-change.priority, change), timeout=1)
                            except queue.Full:
                                print(f"Change queue full, dropping change for {change.page_title}")
                    
                    self.last_full_crawl = datetime.now(timezone.utc)
                    time.sleep(300)  # 5 minutes
                    
                except Exception as e:
                    print(f"Error in change detection loop: {e}")
                    time.sleep(60)  # Wait before retry
        
        def _should_process_change(self, change: 'IncrementalWikipediaCrawler.ChangeEvent') -> bool:
            with self.lock:
                current_version = self.page_versions.get(change.page_title, 0)
                return current_version < change.revision_id
        
        def _process_changes(self) -> None:
            while True:
                try:
                    priority, change = self.change_queue.get(timeout=10)
                    
                    if change.action == "delete":
                        self._remove_page_from_database(change.page_title)
                    else:
                        self._crawl_and_update_page(change.page_title, change.revision_id)
                    
                    with self.lock:
                        self.page_versions[change.page_title] = change.revision_id
                    
                    self.change_queue.task_done()
                    
                except queue.Empty:
                    continue
                except Exception as e:
                    print(f"Error processing change: {e}")
        
        def _crawl_and_update_page(self, page_title: str, revision_id: int) -> None:
            try:
                url = f"https://en.wikipedia.org/wiki/{requests.utils.quote(page_title)}"
                content = self._crawl_page(url)
                self._update_database(page_title, content, revision_id)
            except Exception as e:
                print(f"Error crawling page {page_title}: {e}")
        
        def _crawl_page(self, url: str) -> str:
            response = self.session.get(url, timeout=30)
            response.raise_for_status()
            return response.text
        
        def _remove_page_from_database(self, page_title: str) -> None:
            # Database deletion implementation
            pass
        
        def _update_database(self, page_title: str, content: str, revision_id: int) -> None:
            # Database update implementation
            pass
    
    def run_incremental_crawl(self) -> None:
        crawler = self.IncrementalCrawler()
        crawler.start_incremental_crawling()

Complexity

  • ⏰ Time complexity: O(C + U), where C is the number of changes detected and U is the number of unique page updates. Much more efficient than full re-crawl
  • 🧺 Space complexity: O(P + Q) where P is the number of tracked page versions and Q is the change queue size. Significantly reduced compared to full crawl approach

Notes

  • Method 1 provides a comprehensive distributed crawling solution suitable for initial complete Wikipedia capture
  • Method 2 offers an efficient incremental update system for maintaining synchronization with minimal resource usage
  • Both methods handle key challenges: rate limiting, IP blacklisting, fault tolerance, and distributed coordination
  • The system can be extended with additional features like content deduplication, language detection, and quality scoring
  • Production implementation should include monitoring, alerting, and detailed logging for operational visibility
  • Consider using established message queue systems (Apache Kafka, Redis Streams) for better scalability and durability
  • Database design should support efficient partial updates and maintain referential integrity across distributed updates
  • Legal and ethical considerations include respecting Wikipedia’s terms of service and avoiding excessive load on their servers
  • Real-world implementation might benefit from using Wikipedia’s official dumps and APIs rather than web scraping
  • The architecture can be adapted for other large-scale web crawling scenarios beyond Wikipedia